19.4 性能与安全对比

19 分钟阅读

19.4.1 性能对比#

1. 执行性能#

Skills 执行性能

Skills 的执行性能主要受限于 LLM 的推理时间和网络延迟。

性能特征

python
# Skills 执行时间分解 class SkillPerformanceMetrics: def __init__(self): self.metrics = { "intent_parsing": 0, # 意图解析时间 "parameter_extraction": 0, # 参数提取时间 "skill_execution": 0, # 技能执行时间 "result_generation": 0, # 结果生成时间 "total_time": 0 # 总时间 } def measure_execution(self, skill, user_input): """测量执行时间""" start_time = time.time() # 意图解析 intent_start = time.time() intent = self.parse_intent(user_input) self.metrics["intent_parsing"] = time.time() - intent_start # 参数提取 param_start = time.time() parameters = self.extract_parameters(user_input, intent) self.metrics["parameter_extraction"] = time.time() - param_start # 技能执行 exec_start = time.time() result = skill.execute(parameters) self.metrics["skill_execution"] = time.time() - exec_start # 结果生成 gen_start = time.time() formatted_result = self.format_result(result) self.metrics["result_generation"] = time.time() - gen_start self.metrics["total_time"] = time.time() - start_time return formatted_result

典型执行时间(毫秒)

意图解析:300-500ms

参数提取:200-400ms

技能执行:100-300ms

结果生成:400-600ms

总时间:1000-1800ms

性能优化策略

python
class OptimizedSkill(Skill): def __init__(self): super().__init__() self.cache = {} # 缓存常用请求 self.intent_classifier = self.load_intent_classifier() # 预训练分类器 def parse_intent(self, user_input): """优化意图解析""" # 1. 检查缓存 cache_key = self.get_cache_key(user_input) if cache_key in self.cache: return self.cache[cache_key] # 2. 使用预训练分类器(快速) intent = self.intent_classifier.predict(user_input) # 3. 缓存结果 self.cache[cache_key] = intent return intent def execute(self, parameters, context): """优化执行""" # 批处理请求 if isinstance(parameters, list): return self.batch_execute(parameters, context) # 单个请求执行 return super().execute(parameters, context) def batch_execute(self, parameters_list, context): """批量执行""" results = [] for parameters in parameters_list: result = super().execute(parameters, context) results.append(result) return results

插件执行性能

插件的执行性能主要取决于代码效率和系统资源。

性能特征

插件执行时间分解

class PluginPerformanceMetrics: def init(self): self.metrics = { "parameter_validation": 0, # 参数验证时间 "method_execution": 0, # 方法执行时间 "result_processing": 0, # 结果处理时间 "total_time": 0 # 总时间 } def measure_execution(self, plugin, method_name, parameters): """测量执行时间""" start_time = time.time()

参数验证

valid_start = time.time() self.validate_parameters(plugin, method_name, parameters) self.metrics["parameter_validation"] = time.time() - valid_start

方法执行

exec_start = time.time() result = getattr(plugin, method_name)(**parameters) self.metrics["method_execution"] = time.time() - exec_start

结果处理

proc_start = time.time() formatted_result = self.process_result(result) self.metrics["result_processing"] = time.time() - proc_start self.metrics["total_time"] = time.time() - start_time return formatted_result

典型执行时间(毫秒)

参数验证:5-15ms

方法执行:20-100ms

结果处理:5-10ms

总时间:30-125ms

bash
**性能优化策略**:
```python

class OptimizedPlugin(Plugin):
    def __init__(self):
        super().__init__()
        self.cache = LRUCache(maxsize=1000)  # LRU 缓存
        self.connection_pool = ConnectionPool(size=10)  # 连接池
        self.thread_pool = ThreadPoolExecutor(max_workers=4)  # 线程池

    def execute(self, method_name, parameters):
        """优化执行"""
        # 1. 检查缓存
        cache_key = self.get_cache_key(method_name, parameters)
        cached_result = self.cache.get(cache_key)
        if cached_result:
            return cached_result

        # 2. 使用连接池
        with self.connection_pool.get_connection() as conn:
            # 3. 异步执行
            future = self.thread_pool.submit(
                self._execute_method,
                method_name,
                parameters,
                conn
            )
            result = future.result(timeout=10)

        # 4. 缓存结果
        self.cache.set(cache_key, result)

        return result

    def _execute_method(self, method_name, parameters, conn):
        """执行方法"""
        method = getattr(self, method_name)
        return method(**parameters, connection=conn)

### 2. 并发性能

#### Skills 并发性能

Skills 的并发性能受限于 LLM API 的并发限制。

class SkillConcurrencyManager:
def __init__(self, max_concurrent=5):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.request_queue = asyncio.Queue()
async def execute_skill(self, skill, user_input):
"""异步执行技能"""
async with self.semaphore:
return await skill.execute_async(user_input)
async def batch_execute(self, skill, user_inputs):
"""批量执行"""
tasks = [
self.execute_skill(skill, user_input)
for user_input in user_inputs
]
return await asyncio.gather(*tasks)

性能指标

单个请求:1000-1800ms

并发 5 个:1200-2000ms(每个)

并发 10 个:1500-2500ms(每个)

并发 20 个:2000-3500ms(每个)

bash
#### 插件并发性能
插件的并发性能主要取决于系统资源和代码设计。
```python

class PluginConcurrencyManager:
    def __init__(self, max_workers=10):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.async_executor = ProcessPoolExecutor(max_workers=max_workers)

    def execute_plugin(self, plugin, method_name, parameters):
        """同步执行"""
        future = self.executor.submit(
            self._execute_sync,
            plugin,
            method_name,
            parameters
        )
        return future.result(timeout=30)

    async def execute_plugin_async(self, plugin, method_name, parameters):
        """异步执行"""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor,
            self._execute_sync,
            plugin,
            method_name,
            parameters
        )

    def batch_execute(self, plugin, method_name, parameters_list):
        """批量执行"""
        futures = [
            self.executor.submit(
                self._execute_sync,
                plugin,
                method_name,
                parameters
            )
            for parameters in parameters_list
        ]
        return [future.result() for future in futures]

性能指标

单个请求:30-125ms

并发 10 个:40-150ms(每个)

并发 50 个:60-200ms(每个)

并发 100 个:100-300ms(每个)

3. 资源消耗#

Skills 资源消耗

class SkillResourceMonitor: def init(self): self.metrics = { "cpu_usage": [], # CPU 使用率 "memory_usage": [], # 内存使用 "network_io": [], # 网络 I/O "api_calls": 0 # API 调用次数 } def monitor_execution(self, skill, user_input): """监控执行过程""" import psutil

记录初始状态

process = psutil.Process() initial_cpu = process.cpu_percent() initial_memory = process.memory_info().rss

执行技能

result = skill.execute(user_input)

记录最终状态

final_cpu = process.cpu_percent() final_memory = process.memory_info().rss

计算资源消耗

self.metrics["cpu_usage"].append(final_cpu - initial_cpu) self.metrics["memory_usage"].append(final_memory - initial_memory) self.metrics["api_calls"] += 1 return result

典型资源消耗

CPU 使用率:10-30%

内存使用:100-500MB

网络 I/O:1-5MB/请求

API 调用:2-4 次/请求

bash
#### 插件资源消耗
```python

class PluginResourceMonitor:
    def __init__(self):
        self.metrics = {
            "cpu_usage": [],      # CPU 使用率
            "memory_usage": [],    # 内存使用
            "disk_io": [],        # 磁盘 I/O
            "network_io": []      # 网络 I/O
        }

    def monitor_execution(self, plugin, method_name, parameters):
        """监控执行过程"""
        import psutil

        # 记录初始状态
        process = psutil.Process()
        initial_cpu = process.cpu_percent()
        initial_memory = process.memory_info().rss
        initial_disk_io = process.io_counters()
        initial_network_io = psutil.net_io_counters()

        # 执行插件方法
        result = getattr(plugin, method_name)(**parameters)

        # 记录最终状态
        final_cpu = process.cpu_percent()
        final_memory = process.memory_info().rss
        final_disk_io = process.io_counters()
        final_network_io = psutil.net_io_counters()

        # 计算资源消耗
        self.metrics["cpu_usage"].append(final_cpu - initial_cpu)
        self.metrics["memory_usage"].append(final_memory - initial_memory)
        self.metrics["disk_io"].append(
            final_disk_io.write_bytes - initial_disk_io.write_bytes
        )
        self.metrics["network_io"].append(
            final_network_io.bytes_sent - initial_network_io.bytes_sent
        )

        return result

典型资源消耗

CPU 使用率:5-15%

内存使用:50-200MB

磁盘 I/O:100KB-10MB/请求

网络 I/O:100KB-1MB/请求

4. 性能对比表#

性能指标Skills插件
单次执行时间1000-1800ms30-125ms
并发能力低(5-10)高(50-100+)
CPU 使用率10-30%5-15%
内存使用100-500MB50-200MB
网络 I/O1-5MB/请求100KB-1MB/请求
可扩展性
吞吐量
延迟

19.4.2 安全对比#

1. 输入安全#

Skills 输入安全

Skills 需要防范提示注入攻击和敏感信息泄露。

class SkillInputSecurity: def init(self): self.injection_patterns = [ r"ignore.*instructions", r"forget.*previous", r"new.*instructions", r"override.*system" ] self.sensitive_keywords = [ "password", "token", "api_key", "secret", "credential" ] def sanitize_input(self, user_input): """清理输入""" import re

1. 检测注入攻击

for pattern in self.injection_patterns: if re.search(pattern, user_input, re.IGNORECASE): raise SecurityError("检测到潜在的注入攻击")

2. 移除敏感信息

sanitized = user_input for keyword in self.sensitive_keywords:

使用占位符替换敏感信息

pattern = rf"{keyword}\s*[:=]\s*[^\s]+" sanitized = re.sub(pattern, f"{keyword}=REDACTED", sanitized, flags=re.IGNORECASE)

3. 限制输入长度

max_length = 10000 if len(sanitized) > max_length: sanitized = sanitized[:max_length] return sanitized def validate_input(self, user_input): """验证输入"""

检查恶意代码

if self.contains_malicious_code(user_input): raise SecurityError("输入包含恶意代码")

检查特殊字符

if self.contains_dangerous_characters(user_input): raise SecurityError("输入包含危险字符") return True

bash
#### 插件输入安全
插件通过类型系统和验证规则确保输入安全。
```python

from pydantic import BaseModel, validator, constr
from typing import Optional

class SecureInput(BaseModel):
    """安全输入模型"""
    username: constr(min_length=3, max_length=50)
    email: str
    message: constr(max_length=1000)

    @validator('username')
    def username_alphanumeric(cls, v):
        if not v.isalnum():
            raise ValueError('用户名只能包含字母和数字')
        return v

    @validator('email')
    def email_valid(cls, v):
        import re
        if not re.match(r'^[^@]+@[^@]+\.[^@]+$', v):
            raise ValueError('邮箱格式无效')
        return v

    @validator('message')
    def message_safe(cls, v):
        # 检查 XSS 攻击
        dangerous_patterns = ['<script>', 'javascript:', 'onerror=']
        for pattern in dangerous_patterns:
            if pattern.lower() in v.lower():
                raise ValueError('消息包含潜在危险内容')
        return v

class PluginInputSecurity:
    def __init__(self):
        self.input_model = SecureInput

    def validate_input(self, parameters):
        """验证输入"""
        try:
            validated = self.input_model(**parameters)
            return validated.dict()
        except Exception as e:
            raise SecurityError(f"输入验证失败: {str(e)}")

### 2. 输出安全

#### Skills 输出安全

Skills 需要防止输出敏感信息和有害内容。

class SkillOutputSecurity:
def __init__(self):
self.sensitive_patterns = [
r"password\s*[:=]\s*\S+",
r"token\s*[:=]\s*\S+",
r"api[_-]?key\s*[:=]\s*\S+"
]
self.harmful_categories = [
"violence", "hate", "sexual",
"self-harm", "illegal"
]
def sanitize_output(self, output):
"""清理输出"""
import re
# 1. 移除敏感信息
sanitized = output
for pattern in self.sensitive_patterns:
sanitized = re.sub(pattern, "***REDACTED***", sanitized, flags=re.IGNORECASE)
# 2. 检查有害内容
if self.contains_harmful_content(sanitized):
sanitized = self.add_warning(sanitized, "内容可能包含有害信息")
# 3. 限制输出长度
max_length = 50000
if len(sanitized) > max_length:
sanitized = sanitized[:max_length] + "\n\n[输出已截断]"
return sanitized
def contains_harmful_content(self, text):
"""检查有害内容"""

使用内容审核 API

这里简化实现

harmful_keywords = ["暴力", "仇恨", "非法"] for keyword in harmful_keywords: if keyword in text: return True return False def add_warning(self, text, warning): """添加警告""" return f"[警告: {warning}]\n\n{text}"

bash
#### 插件输出安全
插件通过数据过滤和编码确保输出安全。
```python

class PluginOutputSecurity:
    def __init__(self):
        self.html_escape_map = {
            '&': '&amp;',
            '<': '&lt;',
            '>': '&gt;',
            '"': '&quot;',
bash
        "'": '&#x27;'
    }

def sanitize_output(self, output):
    """清理输出"""
    # 1. HTML 转义
    if isinstance(output, str):
        output = self.html_escape(output)

    # 2. JSON 编码
    if isinstance(output, dict):
        output = self.json_encode(output)

    # 3. 移除敏感字段
    if isinstance(output, dict):
        output = self.remove_sensitive_fields(output)

    return output

def html_escape(self, text):
    """HTML 转义"""
    result = []
    for char in text:
        result.append(self.html_escape_map.get(char, char))
    return ''.join(result)

def json_encode(self, data):
    """JSON 编码"""
    import json
    return json.dumps(data, ensure_ascii=False)

def remove_sensitive_fields(self, data):
    """移除敏感字段"""
    sensitive_fields = ['password', 'token', 'secret']
    return {
        k: v for k, v in data.items()
        if k not in sensitive_fields
    }

3. 访问控制#

Skills 访问控制

Skills 的访问控制相对宽松,主要依赖 LLM 的理解能力。

class SkillAccessControl: def init(self): self.permissions = { "admin": ["all"], "user": ["read", "write"], "guest": ["read"] } def check_permission(self, user, skill_name, action): """检查权限""" user_role = user.get("role", "guest") user_permissions = self.permissions.get(user_role, []) if "all" in user_permissions: return True if action in user_permissions: return True raise PermissionError(f"用户 {user['username']} 无权限执行 {action}") def filter_sensitive_data(self, user, data): """过滤敏感数据""" user_role = user.get("role", "guest") if user_role == "admin": return data

移除敏感字段

filtered = data.copy() sensitive_fields = ["internal_notes", "admin_only"] for field in sensitive_fields: filtered.pop(field, None) return filtered

bash
#### 插件访问控制
插件可以实现细粒度的访问控制。
```python

from functools import wraps

class PluginAccessControl:
    def __init__(self):
        self.roles = {
            "admin": {"priority": 100},
            "user": {"priority": 50},
            "guest": {"priority": 10}
        }
        self.permissions = {}

    def require_permission(self, permission):
        """权限装饰器"""
        def decorator(func):
            @wraps(func)
            def wrapper(self, *args, **kwargs):
                user = kwargs.get('user', None)
                if not user:
                    raise PermissionError("需要用户认证")

                if not self.has_permission(user, permission):
                    raise PermissionError(f"缺少权限: {permission}")

                return func(self, *args, **kwargs)
            return wrapper
        return decorator

    def has_permission(self, user, permission):
        """检查权限"""
        user_role = user.get("role", "guest")
        role_info = self.roles.get(user_role, {})
        user_permissions = self.permissions.get(user_role, set())

        return permission in user_permissions or "all" in user_permissions

    def add_permission(self, role, permission):
        """添加权限"""
        if role not in self.permissions:
            self.permissions[role] = set()
        self.permissions[role].add(permission)

# 使用示例
class SecurePlugin(Plugin):
    def __init__(self):
        super().__init__()
        self.access_control = PluginAccessControl()

    @PluginAccessControl.require_permission("read")
    def read_data(self, user, data_id):
        """读取数据"""
        return self._read_data(data_id)

    @PluginAccessControl.require_permission("write")
    def write_data(self, user, data_id, data):
        """写入数据"""
        return self._write_data(data_id, data)

### 4. 审计日志

#### Skills 审计日志

class SkillAuditLogger:
def __init__(self):
self.logs = []
def log_execution(self, skill_name, user_input, result, user):
"""记录执行"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"skill_name": skill_name,
"user": user.get("username", "anonymous"),
"input": self.sanitize_input(user_input),
"output": self.sanitize_output(result),
"success": result.get("success", False),
"duration": result.get("duration", 0)
}
self.logs.append(log_entry)
def get_logs(self, user=None, skill_name=None, start_time=None, end_time=None):
"""获取日志"""
filtered_logs = self.logs
if user:
filtered_logs = [log for log in filtered_logs if log["user"] == user]
if skill_name:
filtered_logs = [log for log in filtered_logs if log["skill_name"] == skill_name]
if start_time:
filtered_logs = [log for log in filtered_logs if log["timestamp"] >= start_time]
if end_time:
filtered_logs = [log for log in filtered_logs if log["timestamp"] <= end_time]
return filtered_logs

插件审计日志

python
class PluginAuditLogger: def __init__(self): self.logs = [] self.max_logs = 10000 def log_execution(self, plugin_name, method_name, parameters, result, user): """记录执行""" log_entry = { "timestamp": datetime.now().isoformat(), "plugin_name": plugin_name, "method_name": method_name, "user": user.get("username", "anonymous"), "parameters": self.sanitize_parameters(parameters), "result": self.sanitize_result(result), "success": result.get("success", False), "duration": result.get("duration", 0), "ip_address": user.get("ip_address", "unknown") } self.logs.append(log_entry) # 限制日志数量 if len(self.logs) > self.max_logs: self.logs = self.logs[-self.max_logs:] def export_logs(self, format="json"): """导出日志""" if format == "json": import json return json.dumps(self.logs, indent=2) elif format == "csv": import csv import io output = io.StringIO() writer = csv.DictWriter(output, fieldnames=self.logs[0].keys()) writer.writeheader() writer.writerows(self.logs) return output.getvalue()

5. 安全对比表#

安全特性Skills插件
输入验证
输出过滤
访问控制
审计日志
类型安全
注入防护
敏感信息保护
代码审查困难容易
安全测试困难容易
合规性

标记本节教程为已读

记录您的学习进度,方便后续查看。